package com.flow.framework.schedule.job;

import com.flow.framework.common.constant.FrameworkCommonConstant;
import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.json.JsonObject;
import com.flow.framework.common.util.io.IoUtil;
import com.flow.framework.core.holder.SecurityContextHolder;
import com.flow.framework.core.holder.SystemVersionContextHolder;
import com.flow.framework.core.response.Response;
import com.flow.framework.core.system.helper.AsyncHelper;
import com.flow.framework.core.system.listener.lifecycle.ISystemLifecycleListener;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 调度任务抽象实现
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/4/10
 */
@Slf4j
public abstract class BaseScheduleJob extends IJobHandler implements ISystemLifecycleListener {

    private static final Map<String, String> EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP = new ConcurrentHashMap<>(16);

    private static final ThreadLocal<String> JOB_NAME_THREAD_LOCAL = new ThreadLocal<>();

    private boolean allowRunning = true;

    private BaseScheduleJob currentJob;

    /**
     * 缓存任务对象，如果任务被代理，则缓存代理对象
     *
     * @param self self
     */
    public void cacheSelf(BaseScheduleJob self) {
        this.currentJob = self;
    }

    protected final BaseScheduleJob getCurrentJob() {
        return this.currentJob;
    }

    /**
     * 获取执行失败的任务名称和类名
     *
     * @return 行失败的任务名称和类名的map
     */
    public static Map<String, String> getExecuteFailedJobNameAndClazzName() {
        return new HashMap<>(EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP);
    }

    /**
     * 实际调度时执行的方法
     *
     * @param params params
     * @return 返回框架响应
     */
    protected abstract Response<String> handle(String params);

    /**
     * 设置任务名称（主任务）
     *
     * @param jobName 任务名称
     */
    public final void setJobName(String jobName) {
        JOB_NAME_THREAD_LOCAL.set(jobName);
    }

    /**
     * 获取任务名称（主任务）
     *
     * @return 任务名称
     */
    public final String getJobName() {
        return JOB_NAME_THREAD_LOCAL.get();
    }

    /**
     * @inheritDoc
     */
    public final Response<String> schedule(String params) {
        String clazzName = this.getClass().getSimpleName();
        try {
            MDC.clear();
            String traceId = getTraceId();
            MDC.put(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY, traceId);
            if (!allowRunning) {
                log.error("job will be shutdown !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! clazz name : {}", clazzName);
                throw new CheckedException(SystemErrorCode.SERVICE_WILL_SHUTDOWN_ERROR);
            }

            SecurityContextHolder.clearAll();
            SystemVersionContextHolder.clear();

            Response<String> response = currentJob.handle(params);
            if (null == response) {
                EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP.put(getJobName(), clazzName);
                return null;
            }

            if (response.isSuccess()) {
                EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP.remove(getJobName());
            } else {
                EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP.put(getJobName(), clazzName);
            }
            return response;
        } catch (Exception e) {
            EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP.put(getJobName(), clazzName);
            log.error("local schedule error.", e);
            throw e;
        } finally {
            MDC.clear();
            SecurityContextHolder.clearAll();
            SystemVersionContextHolder.clear();
            JOB_NAME_THREAD_LOCAL.remove();
        }
    }

    /**
     * @inheritDoc
     */
    @Override
    public final ReturnT<String> execute(String params) throws Exception {
        try {
            ReturnT<String> returnResult = new ReturnT<>();
            MDC.clear();
            String traceId = getTraceId();
            MDC.put(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY, traceId);
            String clazzName = this.getClass().getSimpleName();

            SecurityContextHolder.clearAll();
            SystemVersionContextHolder.clear();

            XxlJobLogger.log("job executing, trace id : {}", traceId);

            if (!allowRunning) {
                log.error("job will be shutdown !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! clazz name : {}", clazzName);
                returnResult.setCode(ReturnT.FAIL_CODE);
                returnResult.setMsg("job will be shutdown, trace id : " + traceId);
                returnResult.setContent(JsonObject.toString(Response.failed(SystemErrorCode.SERVICE_WILL_SHUTDOWN_ERROR)));
                return returnResult;
            }
            try {
                Response<String> response = currentJob.handle(params);
                if (null != response) {
                    if (response.isSuccess()) {
                        returnResult.setCode(ReturnT.SUCCESS_CODE);
                        removeExecuteFailedJobNames(clazzName);
                    } else {
                        returnResult.setCode(ReturnT.FAIL_CODE);
                    }
                    returnResult.setMsg(response.getMsg() + ", trace id : " + traceId);
                } else {
                    returnResult.setCode(ReturnT.FAIL_CODE);
                }
                returnResult.setContent(JsonObject.toString(response));
                return returnResult;
            } catch (Exception e) {
                log.error("rpc schedule error.", e);
                returnResult.setCode(ReturnT.FAIL_CODE);
                returnResult.setMsg(e.getMessage() + ", trace id : " + traceId);
                returnResult.setContent(getStackTrace(e));
            } finally {
                SystemVersionContextHolder.clear();
            }
            return returnResult;
        } finally {
            MDC.clear();
            SecurityContextHolder.clearAll();
            SystemVersionContextHolder.clear();
            JOB_NAME_THREAD_LOCAL.remove();
        }
    }

    private void removeExecuteFailedJobNames(String clazzName) {
        List<String> executeFailedJobNames = new ArrayList<>();
        EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP.forEach((key, value) -> {
            if (clazzName.equals(value)) {
                executeFailedJobNames.add(key);
            }
        });
        executeFailedJobNames.forEach(EXECUTE_FAILED_JOB_NAME_AND_CLAZZ_NAME_MAP::remove);
    }

    private String getStackTrace(Throwable throwable) {
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        try {
            throwable.printStackTrace(pw);
            return sw.toString();
        } finally {
            IoUtil.close(pw, sw);
        }
    }

    protected String getTraceId() {
        return AsyncHelper.randomAsyncTraceId();
    }

    @Override
    public void init() {
        // do something
    }

    @Override
    public void destroy() {
        // do something
    }

    @Override
    public void beforeShutdown() {
        allowRunning = false;
    }
}
